package com.ferry.jraft.impl;

import com.ferry.jraft.ConsensusModule;
import com.ferry.jraft.enums.NodeStatusEnum;
import com.ferry.jraft.model.LogEntry;
import com.ferry.jraft.model.dto.AppendEntriesRequest;
import com.ferry.jraft.model.dto.AppendEntriesResponse;
import com.ferry.jraft.model.dto.VoteRequest;
import com.ferry.jraft.model.dto.VoteResponse;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.util.concurrent.locks.ReentrantLock;

/**
 * @Author ferry
 * @create 2022/5/21 14:17
 * @description
 */
@Slf4j
@Data
public class ConsensusImpl implements ConsensusModule {

    public final NodeImpl node;

    public final ReentrantLock voteLock = new ReentrantLock();

    public final ReentrantLock appendEntriesLock = new ReentrantLock();

    public ConsensusImpl(NodeImpl node) {
        this.node = node;
    }

    /**
     * 接收者请求投票的实现
     * 1. 如果term<currentTerm，即candidate的任期小于接收者的任期，响应false
     * 2. 如果votedFor为空或者为candidateId，并且候选人的日志至少和自己一样新，那么就投票给他
     *
     * @param voteRequest
     * @return
     */
    @Override
    public VoteResponse requestVote(VoteRequest voteRequest) {
        VoteResponse response;
        try {
            //按照先来先服务的原则进行投票，如果票已经投出去了，拒绝其他所有投票请求
            if (!voteLock.tryLock()) {
                log.warn("Node {}: Since self has voted, vote request from Node {} is rejected", node.peerConfig.getId(), voteRequest.getCandidateId());
                return new VoteResponse(node.getPersistentState().getCurrentTerm(), false);
            }

            //1. 如果term<currentTerm，即candidate的任期小于接收者的任期，响应false
            if (voteRequest.getTerm() < node.getPersistentState().getCurrentTerm()) {
                log.warn("Node {}: Since term {} of request from Node {} < currentTerm {}, vote request is rejected", node.peerConfig.getId(), voteRequest.getTerm(), voteRequest.getCandidateId(), node.persistentState.getCurrentTerm());
                return new VoteResponse(node.getPersistentState().getCurrentTerm(), false);
            }

            //2. 如果votedFor为0或者为candidateId，并且候选人的日志至少和自己一样新，那么就投票给他
            if (0 == (node.getPersistentState().getVotedFor()) || node.getPersistentState().getVotedFor() == (voteRequest.getCandidateId())) {
                LogEntry entry = node.getPersistentState().logModule.getLast();
                if (entry != null) {
                    LogEntry tmpLast = LogEntry.builder().term(voteRequest.getLastLogTerm()).index(voteRequest.getLastLogIndex()).build();
                    //如果请求方日志没有自身的日志新，返回false
                    if (entry.compareTo(tmpLast) > 0) {
                        return new VoteResponse(node.getPersistentState().getCurrentTerm(), false);
                    }
                }
                //投票给请求者
                //首先转变为follower
                node.setNodeStatus(NodeStatusEnum.FOLLOWER);
                //更新集群信息
                node.peerGroup.setLeader(node.peerGroup.searchById(voteRequest.getCandidateId()));
                node.persistentState.setCurrentTerm(voteRequest.getTerm());
                //votedFor设置为candidate的id
                node.persistentState.setVotedFor(voteRequest.getCandidateId());

                log.info("Node {}: Vote for Node {}", node.peerConfig.getId(), voteRequest.getCandidateId());
                return new VoteResponse(node.persistentState.getCurrentTerm(), true);
            }
            log.warn("Node {}: VotedFor is {}, reject to vote for Node {}", node.peerConfig.getId(), node.persistentState.getVotedFor(), voteRequest.getCandidateId());
            return new VoteResponse(node.persistentState.getCurrentTerm(), false);
        } finally {
            voteLock.unlock();
        }

    }

    /**
     * 接收者追加日志的实现
     * 1. 如果leader的term<currentTerm，即当前leader的任期小于返回值中的term，响应false
     * 2. 如果接收者的日志条目不匹配参数值中的prevLogIndex、prevLogTerm，响应false
     * 3. 如果已有的日志条目与新接收到的日志条目产生了冲突(索引相同但任期不同)，删除掉已有的该条目及其之后的所有日志条目
     * 4. 追加日志中尚未存在的新条目
     * 5. 如果leaderCommit>commitIndex，即leader已提交的日志条目的最大索引大于接收者已提交的日志条目的最大索引，则重设接收者的commitIndex为min(leaderCommit，上一个新条目的索引)
     *
     * @param appendEntriesRequest
     * @return
     */
    @Override
    public AppendEntriesResponse appendEntries(AppendEntriesRequest appendEntriesRequest) {
        try {
            //如果已经在处理追加日志请求，那么直接返回false
            if (!appendEntriesLock.tryLock()) {
                log.warn("Node {}: Get append entries lock failed", node.peerConfig.getId());
                return new AppendEntriesResponse(node.persistentState.getCurrentTerm(), false);
            }
            //1. 如果leader的term<currentTerm，即当前leader的任期小于返回值中的term，响应false
            if (appendEntriesRequest.getTerm() < node.persistentState.getCurrentTerm()) {
                log.info("Node {}: Since currentTerm:{} > request term:{}, append entries request from Node {} is rejected", node.peerConfig.getId(), node.persistentState.getCurrentTerm(), appendEntriesRequest.getTerm(), appendEntriesRequest.getLeaderId());
                return new AppendEntriesResponse(node.persistentState.getCurrentTerm(), false);
            }
            node.setPrevElectionTime(System.currentTimeMillis());
            node.setPrevHeartBeatTime(System.currentTimeMillis());
            node.peerGroup.setLeader(node.peerGroup.searchById(appendEntriesRequest.getLeaderId()));

            node.setNodeStatus(NodeStatusEnum.FOLLOWER);
            node.persistentState.setCurrentTerm(appendEntriesRequest.getTerm());
            node.persistentState.setVotedFor(0);

            //处理心跳
            if (appendEntriesRequest.getEntries() == null || appendEntriesRequest.getEntries().size() == 0) {
                log.info("Node {}: Receive heartbeat from leader Node {}", node.peerConfig.getId(), appendEntriesRequest.getLeaderId());
                return new AppendEntriesResponse(node.persistentState.getCurrentTerm(), true);
            }

            //   如果日志不为空
            //2. 如果接收者的日志条目不匹配参数值中的prevLogIndex、prevLogTerm，响应false
            log.info("Node {}: Receive append entries request from Node {}, logs:{}", node.peerConfig.getId(), appendEntriesRequest.getLeaderId(), appendEntriesRequest.getEntries());
            if (node.persistentState.logModule.getLastIndex() != 0 && appendEntriesRequest.getPrevLogIndex() != 0) {
                LogEntry entry;
                if ((entry = node.persistentState.logModule.read(appendEntriesRequest.getPrevLogIndex())) != null) {
                    if (entry.getTerm() != appendEntriesRequest.getPrevLogTerm()) {
                        return new AppendEntriesResponse(node.persistentState.getCurrentTerm(), false);
                    }
                } else {
                    return new AppendEntriesResponse(node.persistentState.getCurrentTerm(), false);
                }
            }

            //3. 如果已有的日志条目与新接收到的日志条目产生了冲突(索引相同但任期不同)，删除掉已有的该条目及其之后的所有日志条目
            LogEntry firstNewEntry = node.persistentState.logModule.read(appendEntriesRequest.getPrevLogIndex() + 1);
            if (firstNewEntry != null && firstNewEntry.getTerm() != appendEntriesRequest.getEntries().get(0).getTerm()) {
                node.persistentState.logModule.removeOnStartIndex(appendEntriesRequest.getPrevLogIndex() + 1);
            } else if (firstNewEntry != null) {
                //如果未发生冲突且新索引位置上已经有了相同的日志，直接返回true，不再重复追加
                return new AppendEntriesResponse(node.persistentState.getCurrentTerm(), true);
            }

            //4. 追加日志中尚未存在的新条目
            for (LogEntry logEntry : appendEntriesRequest.getEntries()) {
                node.persistentState.logModule.write(logEntry);
            }

            long prevCommitIndex = node.volatileState.getCommitIndex();

            //5. 如果leaderCommit>commitIndex，即leader已提交的日志条目的最大索引大于接收者已提交的日志条目的最大索引，
            //   则重设接收者的commitIndex为min(leaderCommit，上一个新条目的索引)
            if (appendEntriesRequest.getLeaderCommit() > node.volatileState.getCommitIndex()) {
                int commitIndex = (int) Math.min(appendEntriesRequest.getLeaderCommit(), node.persistentState.logModule.getLastIndex());
                node.volatileState.setCommitIndex(commitIndex);
            }

            //提交日志
            for (long i = prevCommitIndex + 1; i <= node.volatileState.getCommitIndex(); i++) {
                node.commitChannel.offer(node.persistentState.logModule.read(i));
            }
            return new AppendEntriesResponse(node.persistentState.getCurrentTerm(), true);
        } finally {
            appendEntriesLock.unlock();
        }
    }
}
